66180853b90c213c5cae5e5de02f499b437d609e,xap-core/xap-datagrid/src/main/java/com/gigaspaces/internal/cluster/node/impl/backlog/multisourcesinglefile/MultiSourceSingleFileReliableAsyncGroupBacklog.java,MultiSourceSingleFileReliableAsyncGroupBacklog,updateReliableAsyncState,#IReliableAsyncState#String#,195
Before Change
MultiSourceSingleFileConfirmationHolder lastConfirmedBySource = getConfirmationHolderUnsafe(sourceMemberName);
if (!lastConfirmedBySource.hadAnyHandshake()
|| lastConfirmedBySource.getLastConfirmedKey() < minConfirmed) {
decreaseWeight(sourceMemberName, lastConfirmedBySource.getLastConfirmedKey(), minConfirmed);
lastConfirmedBySource.setLastConfirmedKey(minConfirmed);
}
clearConfirmedPackets();
} finally {
After Change
}
@Override
public void updateReliableAsyncState(IReliableAsyncState reliableAsyncState, String sourceMemberName) throws NoSuchReplicationMemberException, MissingReliableAsyncTargetStateException {
MultiSourceSingleFileReliableAsyncState sharedAsyncState = (MultiSourceSingleFileReliableAsyncState) reliableAsyncState;
_rwLock.writeLock().lock();
try {
if (_logger.isLoggable(Level.FINEST))
_logger.finest(getLogPrefix()
+ "incoming reliable async state update "
+ Arrays.toString(sharedAsyncState.getAsyncTargetsState()));
ReliableAsyncSourceGroupConfig sourceGroupConfig = getGroupConfigSnapshot();
//This must be done before updating any member state, otherwise it can cause deletion of packets
//that are not confirmed by the unknown member
validateReliableAsyncUpdateTargetsMatch(reliableAsyncState, sourceMemberName);
for (MultiSourceAsyncTargetState asyncTargetState : sharedAsyncState.getAsyncTargetsState()) {
if (asyncTargetState.hadAnyHandshake()) {
long lastConfirmedKey = asyncTargetState.getLastConfirmedKey();
long lastReceivedKey = asyncTargetState.getLastReceivedKey();
String targetMemberName = asyncTargetState.getTargetMemberName();
final MultiSourceSingleFileConfirmationHolder confirmationHolder = getConfirmationHolderUnsafe(targetMemberName);
confirmationHolder.setLastConfirmedKey(lastConfirmedKey, targetMemberName, this);
confirmationHolder.setLastReceivedKey(lastReceivedKey);
}
}
long minConfirmed = getNextKeyUnsafe() - 1;
// We update all the sync keepers state to the minimum confirmed key
// such that in case of a failover they will receive the proper
// backlog
if (sharedAsyncState.getAsyncTargetsState().length > 0) {
minConfirmed = sharedAsyncState.getMinimumUnconfirmedKey() - 1;
if (getNextKeyUnsafe() < minConfirmed + 1)
setNextKeyUnsafe(minConfirmed + 1);
}
MultiSourceSingleFileConfirmationHolder lastConfirmedBySource = getConfirmationHolderUnsafe(sourceMemberName);
if (!lastConfirmedBySource.hadAnyHandshake()
|| lastConfirmedBySource.getLastConfirmedKey() < minConfirmed) {
lastConfirmedBySource.setLastConfirmedKey(minConfirmed, sourceMemberName, this);
}
clearConfirmedPackets();
} finally {